package thread;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import code.ClientChannelHandler;

/**
 * 基于disruptor的线程池
 * @author jinmiao
 * 2014-9-12 上午9:51:09
 */
public class BaseDisruptorExecutor 
{
	private static final Logger log = LoggerFactory.getLogger(BaseDisruptorExecutor.class); 
	protected List<IMessageProcessor> executor = new ArrayList<IMessageProcessor>();
	
	protected Map<String, IMessageProcessor> executorMap = new HashMap<String, IMessageProcessor>();
	
	protected AtomicInteger index = new AtomicInteger();
	
	
	/**
	 * 创造一个线程对象
	 * @param threadName
	 * @return
	 */
	protected IMessageProcessor createDisruptorProcessor(String threadName,int threadNum)
	{
		if(executorMap.containsKey(threadName))
			throw new RuntimeException("有相同名称的线程池");
		IMessageProcessor process = null;
		if(threadNum<2)
		{
			DisruptorSingleProcessor singleProcess = new DisruptorSingleProcessor(threadName);
			process = singleProcess;
		}
		else
		{
			DisruptorMultiProcessor multiProcess = new DisruptorMultiProcessor(threadName,threadNum);
			process = multiProcess;
		}
		executorMap.put(threadName, process);
		executor.add(process);
		process.start();
		return process;
	}
	
	/**
	 * 发布一个线程任务
	 * @param threadName
	 * @param task  可以是回调任务 {@link DisruptorAsyncBackTask}  或者  {@link DisruptorAsyncTask}
	 * @throws Throwable 
	 */
	public void publishTask(ITask task) throws Throwable
	{
		if(task instanceof DistriptorTimerTask)
		{
			throw new UnsupportedOperationException("plase use pushTimerTask(DistriptorTimerTask task)  method");
		} 
		
		if(task instanceof DisruptorAsyncBackTask)
			((DisruptorAsyncBackTask)task).executor = this;
		if(task.getExecuteName()==null)
		{
			task.execute();
			return;
		}
		IMessageProcessor process = executorMap.get(task.getExecuteName());
		if(process==null)
		{
			log.error("threadService is not exist,threadServiceName = "+task.getExecuteName());
			return;
		}
		process.put(task);
	}
	
	/**
	 * 放入一个定时任务
	 * @param task
	 */
	public void pushTimerTask(DistriptorTimerTask task)
	{
		if(task.getExecuteName()==null)
		{
			task.execute();
			return;
		}
		IMessageProcessor process = executorMap.get(task.getExecuteName());
		if(process==null)
		{
			log.error("threadService is not exist,threadServiceName = "+task.getExecuteName());
			return;
		}
		process.putTimerTask(task);
	}
	
	
	
	
	public void stop()
	{
		for(IMessageProcessor process:executor)
		{
			process.stop();
		}
	}
	
	
	
	/**
	 * 从线程池中按算法获得一个线程对象
	 * @return
	 */
	public IMessageProcessor getAutoDisruptorProcessor()
	{
		int index = this.index.incrementAndGet();
		return executor.get(index%executor.size());
	}
	
//	public IMessageProcessor getFirstDisruptorProcessor()
//	{
//		if(executor.isEmpty())
//			return null;
//		return executor.get(0);
//	}
	
	
	
	/**
	 * 根据名称获得线程
	 * @param threadName
	 * @return
	 */
	public IMessageProcessor getProcessByName(String threadName)
	{
		return executorMap.get(threadName);
	}
	
	
//	public static void main(String[] args) {
//		BaseDisruptorExecutor executor = new BaseDisruptorExecutor();
//		IMessageProcessor process = executor.createDisruptorProcessor("main",1);
//		executor.createDisruptorProcessor("async",2);
//		TestAsyncTask task = new TestAsyncTask();
//		task.executor = executor;
//		process.put(task);
//	}

}
